package logging

import (
	"fmt"

	"github.com/apache/arrow/go/v17/arrow"
	"github.com/apache/arrow/go/v17/arrow/array"
	"github.com/apache/arrow/go/v17/arrow/memory"

	"github.com/feast-dev/feast/go/protos/feast/types"
	gotypes "github.com/feast-dev/feast/go/types"
)

type MemoryBuffer struct {
	logs   []*Log
	schema *FeatureServiceSchema

	arrowSchema *arrow.Schema
	records     []arrow.Record
}

const (
	LOG_TIMESTAMP_FIELD  = "__log_timestamp"
	LOG_DATE_FIELD       = "__log_date"
	LOG_REQUEST_ID_FIELD = "__request_id"
	RECORD_SIZE          = 1000
)

func NewMemoryBuffer(schema *FeatureServiceSchema) (*MemoryBuffer, error) {
	arrowSchema, err := getArrowSchema(schema)
	if err != nil {
		return nil, err
	}
	return &MemoryBuffer{
		logs:        make([]*Log, 0),
		records:     make([]arrow.Record, 0),
		schema:      schema,
		arrowSchema: arrowSchema,
	}, nil
}

// Acquires the logging schema from the feature service, converts the memory buffer array of rows of logs and flushes
// them to the offline storage.
func (b *MemoryBuffer) writeBatch(sink LogSink) error {
	if len(b.logs) > 0 {
		err := b.Compact()
		if err != nil {
			return err
		}
	}

	if len(b.records) == 0 {
		return nil
	}

	err := sink.Write(b.records)
	if err != nil {
		return err
	}

	b.records = b.records[:0]
	return nil
}

func (b *MemoryBuffer) Append(log *Log) error {
	b.logs = append(b.logs, log)

	if len(b.logs) == RECORD_SIZE {
		return b.Compact()
	}

	return nil
}

func (b *MemoryBuffer) Compact() error {
	rec, err := b.convertToArrowRecord()
	if err != nil {
		return err
	}
	b.records = append(b.records, rec)
	b.logs = b.logs[:0]
	return nil
}

func getArrowSchema(schema *FeatureServiceSchema) (*arrow.Schema, error) {
	fields := make([]arrow.Field, 0)

	for _, joinKey := range schema.JoinKeys {
		arrowType, err := gotypes.ValueTypeEnumToArrowType(schema.JoinKeysTypes[joinKey])
		if err != nil {
			return nil, err
		}

		fields = append(fields, arrow.Field{Name: joinKey, Type: arrowType})
	}

	for _, requestParam := range schema.RequestData {
		arrowType, err := gotypes.ValueTypeEnumToArrowType(schema.RequestDataTypes[requestParam])
		if err != nil {
			return nil, err
		}

		fields = append(fields, arrow.Field{Name: requestParam, Type: arrowType})
	}

	for _, featureName := range schema.Features {
		arrowType, err := gotypes.ValueTypeEnumToArrowType(schema.FeaturesTypes[featureName])
		if err != nil {
			return nil, err
		}

		fields = append(fields, arrow.Field{Name: featureName, Type: arrowType})
		fields = append(fields, arrow.Field{
			Name: fmt.Sprintf("%s__timestamp", featureName),
			Type: arrow.FixedWidthTypes.Timestamp_s})
		fields = append(fields, arrow.Field{
			Name: fmt.Sprintf("%s__status", featureName),
			Type: arrow.PrimitiveTypes.Int32})
	}

	fields = append(fields, arrow.Field{Name: LOG_TIMESTAMP_FIELD, Type: arrow.FixedWidthTypes.Timestamp_us})
	fields = append(fields, arrow.Field{Name: LOG_DATE_FIELD, Type: arrow.FixedWidthTypes.Date32})
	fields = append(fields, arrow.Field{Name: LOG_REQUEST_ID_FIELD, Type: arrow.BinaryTypes.String})

	return arrow.NewSchema(fields, nil), nil
}

// convertToArrowRecord Takes memory buffer of logs in array row and converts them to columnar with generated fcoschema generated by GetFcoSchema
// and writes them to arrow table.
// Returns arrow table that contains all of the logs in columnar format.
func (b *MemoryBuffer) convertToArrowRecord() (arrow.Record, error) {
	arrowMemory := memory.NewGoAllocator()
	numRows := len(b.logs)

	columns := make(map[string][]*types.Value)
	fieldNameToIdx := make(map[string]int)
	for idx, field := range b.arrowSchema.Fields() {
		fieldNameToIdx[field.Name] = idx
	}

	builder := array.NewRecordBuilder(arrowMemory, b.arrowSchema)
	defer builder.Release()

	builder.Reserve(numRows)

	for rowIdx, logRow := range b.logs {
		for colIdx, joinKey := range b.schema.JoinKeys {
			if _, ok := columns[joinKey]; !ok {
				columns[joinKey] = make([]*types.Value, numRows)
			}
			columns[joinKey][rowIdx] = logRow.EntityValue[colIdx]
		}
		for colIdx, requestParam := range b.schema.RequestData {
			if _, ok := columns[requestParam]; !ok {
				columns[requestParam] = make([]*types.Value, numRows)
			}
			columns[requestParam][rowIdx] = logRow.RequestData[colIdx]
		}
		for colIdx, featureName := range b.schema.Features {
			if _, ok := columns[featureName]; !ok {
				columns[featureName] = make([]*types.Value, numRows)
			}
			columns[featureName][rowIdx] = logRow.FeatureValues[colIdx]

			timestamp := arrow.Timestamp(logRow.EventTimestamps[colIdx].GetSeconds())
			timestampFieldIdx := fieldNameToIdx[fmt.Sprintf("%s__timestamp", featureName)]
			statusFieldIdx := fieldNameToIdx[fmt.Sprintf("%s__status", featureName)]

			builder.Field(timestampFieldIdx).(*array.TimestampBuilder).UnsafeAppend(timestamp)
			builder.Field(statusFieldIdx).(*array.Int32Builder).UnsafeAppend(int32(logRow.FeatureStatuses[colIdx]))
		}

		logTimestamp := arrow.Timestamp(logRow.LogTimestamp.UnixMicro())
		logDate := arrow.Date32FromTime(logRow.LogTimestamp)

		builder.Field(fieldNameToIdx[LOG_TIMESTAMP_FIELD]).(*array.TimestampBuilder).UnsafeAppend(logTimestamp)
		builder.Field(fieldNameToIdx[LOG_DATE_FIELD]).(*array.Date32Builder).UnsafeAppend(logDate)
		builder.Field(fieldNameToIdx[LOG_REQUEST_ID_FIELD]).(*array.StringBuilder).Append(logRow.RequestId)
	}

	for columnName, protoArray := range columns {
		fieldIdx := fieldNameToIdx[columnName]
		err := gotypes.CopyProtoValuesToArrowArray(builder.Field(fieldIdx), protoArray)
		if err != nil {
			return nil, err
		}
	}

	return builder.NewRecord(), nil
}
